from abc import ABCMeta, abstractmethod
import math

from ray.serve.config import AutoscalingConfig
from ray.serve.constants import CONTROL_LOOP_PERIOD_S

from typing import List


def calculate_desired_num_replicas(autoscaling_config: AutoscalingConfig,
                                   current_num_ongoing_requests: List[float]
                                   ) -> int:  # (desired replicas):
    """Returns the number of replicas to scale to based on the given metrics.

    Args:
        autoscaling_config: The autoscaling parameters to use for this
            calculation.
        current_num_ongoing_requests (List[float]): A list of the number of
            ongoing requests for each replica.  Assumes each entry has already
            been time-averaged over the desired lookback window.

    Returns:
        desired_num_replicas: The desired number of replicas to scale to, based
            on the input metrics and the current number of replicas.

    """
    current_num_replicas = len(current_num_ongoing_requests)
    if current_num_replicas == 0:
        raise ValueError("Number of replicas cannot be zero")

    # The number of ongoing requests per replica, averaged over all replicas.
    num_ongoing_requests_per_replica: float = sum(
        current_num_ongoing_requests) / len(current_num_ongoing_requests)

    # Example: if error_ratio == 2.0, we have two times too many ongoing
    # requests per replica, so we desire twice as many replicas.
    error_ratio: float = (
        num_ongoing_requests_per_replica /
        autoscaling_config.target_num_ongoing_requests_per_replica)

    # Multiply the distance to 1 by the smoothing ("gain") factor (default=1).
    smoothed_error_ratio = 1 + (
        (error_ratio - 1) * autoscaling_config.smoothing_factor)
    desired_num_replicas = math.ceil(
        current_num_replicas * smoothed_error_ratio)

    # Ensure min_replicas <= desired_num_replicas <= max_replicas.
    desired_num_replicas = min(autoscaling_config.max_replicas,
                               desired_num_replicas)
    desired_num_replicas = max(autoscaling_config.min_replicas,
                               desired_num_replicas)

    return desired_num_replicas


class AutoscalingPolicy:
    """Defines the interface for an autoscaling policy.

    To add a new autoscaling policy, a class should be defined that provides
    this interface. The class may be stateful, in which case it may also want
    to provide a non-default constructor. However, this state will be lost when
    the controller recovers from a failure.
    """
    __metaclass__ = ABCMeta

    def __init__(self, config: AutoscalingConfig):
        """Initialize the policy using the specified config dictionary."""
        self.config = config

    @abstractmethod
    def get_decision_num_replicas(self,
                                  current_num_ongoing_requests: List[float],
                                  curr_num_replicas: int) -> int:
        """Make a decision to scale backends.

        Arguments:
            current_num_ongoing_requests: List[float]: List of number of
                ongoing requests for each replica.
            curr_num_replicas (int): The number of replicas that the backend
                currently has.

        Returns:
            int: The new number of replicas to scale this backend to.
        """
        return curr_num_replicas


class BasicAutoscalingPolicy(AutoscalingPolicy):
    """The default autoscaling policy based on basic thresholds for scaling.
    There is a minimum threshold for the average queue length in the cluster
    to scale up and a maximum threshold to scale down. Each period, a 'scale
    up' or 'scale down' decision is made. This decision must be made for a
    specified number of periods in a row before the number of replicas is
    actually scaled. See config options for more details.  Assumes
    `get_decision_num_replicas` is called once every CONTROL_LOOP_PERIOD_S
    seconds.
    """

    def __init__(self, config: AutoscalingConfig):
        self.config = config
        # TODO(architkulkarni): Make configurable via AutoscalingConfig
        self.loop_period_s = CONTROL_LOOP_PERIOD_S
        self.scale_up_consecutive_periods = int(
            config.upscale_delay_s / self.loop_period_s)
        self.scale_down_consecutive_periods = int(
            config.downscale_delay_s / self.loop_period_s)

        # Keeps track of previous decisions. Each time the load is above
        # 'scale_up_threshold', the counter is incremented and each time it is
        # below 'scale_down_threshold', the counter is decremented. When the
        # load is between the thresholds or a scaling decision is made, the
        # counter is reset to 0.
        # TODO(architkulkarni): It may be too noisy to reset the counter each
        # time the direction changes, especially if we calculate frequently
        # (like every 0.1s).  A potentially less noisy option is to not reset
        # the counter, and instead only increment/decrement until we reach
        # scale_up_periods or scale_down_periods.
        self.decision_counter = 0

    def get_decision_num_replicas(self,
                                  current_num_ongoing_requests: List[float],
                                  curr_num_replicas: int) -> int:

        if len(current_num_ongoing_requests) == 0:
            return curr_num_replicas

        decision_num_replicas = curr_num_replicas

        desired_num_replicas = calculate_desired_num_replicas(
            self.config, current_num_ongoing_requests)
        # Scale up.
        if desired_num_replicas > curr_num_replicas:
            # If the previous decision was to scale down (the counter was
            # negative), we reset it and then increment it (set to 1).
            # Otherwise, just increment.
            if self.decision_counter < 0:
                self.decision_counter = 0
            self.decision_counter += 1

            # Only actually scale the replicas if we've made this decision for
            # 'scale_up_consecutive_periods' in a row.
            if self.decision_counter > self.scale_up_consecutive_periods:
                self.decision_counter = 0
                decision_num_replicas = desired_num_replicas

        # Scale down.
        elif desired_num_replicas < curr_num_replicas:
            # If the previous decision was to scale up (the counter was
            # positive), reset it to zero before decrementing.
            if self.decision_counter > 0:
                self.decision_counter = 0
            self.decision_counter -= 1

            # Only actually scale the replicas if we've made this decision for
            # 'scale_down_consecutive_periods' in a row.
            if self.decision_counter < -self.scale_down_consecutive_periods:
                # TODO(architkulkarni): curr_replicas will now slowly adjust
                # until it gets to decision_num_replicas, but we will be making
                # future decision_counter calculations on this moving
                # curr_replicas value.  Is this okay?
                self.decision_counter = 0
                decision_num_replicas = desired_num_replicas

        # Do nothing.
        else:
            self.decision_counter = 0

        return decision_num_replicas
